home *** CD-ROM | disk | FTP | other *** search
/ Total Network Tools 2002 / NextStepPublishing-TotalNetworkTools2002-Win95.iso / Archive / Misc Servers / Zope.exe / SELECT_TRIGGER.PY < prev    next >
Encoding:
Python Source  |  2000-06-02  |  8.9 KB  |  282 lines

  1. # -*- Mode: Python; tab-width: 4 -*-
  2.  
  3. VERSION_STRING = "$Id: select_trigger.py,v 1.14 2000/06/02 14:22:48 brian Exp $"
  4.  
  5. import asyncore
  6. import asynchat
  7.  
  8. import os
  9. import socket
  10. import string
  11. import thread
  12.     
  13. if os.name == 'posix':
  14.  
  15.     class trigger (asyncore.file_dispatcher):
  16.  
  17.         "Wake up a call to select() running in the main thread"
  18.  
  19.         # This is useful in a context where you are using Medusa's I/O
  20.         # subsystem to deliver data, but the data is generated by another
  21.         # thread.  Normally, if Medusa is in the middle of a call to
  22.         # select(), new output data generated by another thread will have
  23.         # to sit until the call to select() either times out or returns.
  24.         # If the trigger is 'pulled' by another thread, it should immediately
  25.         # generate a READ event on the trigger object, which will force the
  26.         # select() invocation to return.
  27.  
  28.         # A common use for this facility: letting Medusa manage I/O for a
  29.         # large number of connections; but routing each request through a
  30.         # thread chosen from a fixed-size thread pool.  When a thread is
  31.         # acquired, a transaction is performed, but output data is
  32.         # accumulated into buffers that will be emptied more efficiently
  33.         # by Medusa. [picture a server that can process database queries
  34.         # rapidly, but doesn't want to tie up threads waiting to send data
  35.         # to low-bandwidth connections]
  36.  
  37.         # The other major feature provided by this class is the ability to
  38.         # move work back into the main thread: if you call pull_trigger()
  39.         # with a thunk argument, when select() wakes up and receives the
  40.         # event it will call your thunk from within that thread.  The main
  41.         # purpose of this is to remove the need to wrap thread locks around
  42.         # Medusa's data structures, which normally do not need them.  [To see
  43.         # why this is true, imagine this scenario: A thread tries to push some
  44.         # new data onto a channel's outgoing data queue at the same time that
  45.         # the main thread is trying to remove some]
  46.  
  47.         def __init__ (self):
  48.             r, w = os.pipe()
  49.             self.trigger = w
  50.             asyncore.file_dispatcher.__init__ (self, r)
  51.             self.lock = thread.allocate_lock()
  52.             self.thunks = []
  53.  
  54.         def __repr__ (self):
  55.             return '<select-trigger (pipe) at %x>' % id(self)
  56.  
  57.         def readable (self):
  58.             return 1
  59.  
  60.         def writable (self):
  61.             return 0
  62.  
  63.         def handle_connect (self):
  64.             pass
  65.  
  66.         def pull_trigger (self, thunk=None):
  67.             # print 'PULL_TRIGGER: ', len(self.thunks)
  68.             if thunk:
  69.                 try:
  70.                     self.lock.acquire()
  71.                     self.thunks.append (thunk)
  72.                 finally:
  73.                     self.lock.release()
  74.             os.write (self.trigger, 'x')
  75.  
  76.         def handle_read (self):
  77.             self.recv (8192)
  78.             try:
  79.                 self.lock.acquire()
  80.                 for thunk in self.thunks:
  81.                     try:
  82.                         thunk()
  83.                     except:
  84.                         (file, fun, line), t, v, tbinfo = asyncore.compact_traceback()
  85.                         print 'exception in trigger thunk: (%s:%s %s)' % (t, v, tbinfo)
  86.                 self.thunks = []
  87.             finally:
  88.                 self.lock.release()
  89.  
  90. else:
  91.  
  92.     # win32-safe version
  93.  
  94.     class trigger (asyncore.dispatcher):
  95.  
  96.         address = ('127.9.9.9', 19999)
  97.  
  98.         def __init__ (self):
  99.             a = socket.socket (socket.AF_INET, socket.SOCK_STREAM)
  100.             w = socket.socket (socket.AF_INET, socket.SOCK_STREAM)
  101.  
  102.             # set TCP_NODELAY to true to avoid buffering
  103.             w.setsockopt(socket.IPPROTO_TCP, 1, 1)
  104.  
  105.             # tricky: get a pair of connected sockets
  106.             host='127.0.0.1'
  107.             port=19999
  108.             while 1:
  109.                 try:
  110.                     self.address=(host, port)
  111.                     a.bind(self.address)
  112.                     break
  113.                 except:
  114.                     if port <= 19950:
  115.                         raise 'Bind Error', 'Cannot bind trigger!'
  116.                     port=port - 1
  117.             
  118.             a.listen (1)
  119.             w.setblocking (0)
  120.             try:
  121.                 w.connect (self.address)
  122.             except:
  123.                 pass
  124.             r, addr = a.accept()
  125.             a.close()
  126.             w.setblocking (1)
  127.             self.trigger = w
  128.  
  129.             asyncore.dispatcher.__init__ (self, r)
  130.             self.lock = thread.allocate_lock()
  131.             self.thunks = []
  132.             self._trigger_connected = 0
  133.  
  134.         def __repr__ (self):
  135.             return '<select-trigger (loopback) at %x>' % id(self)
  136.  
  137.         def readable (self):
  138.             return 1
  139.  
  140.         def writable (self):
  141.             return 0
  142.  
  143.         def handle_connect (self):
  144.             pass
  145.  
  146.         def pull_trigger (self, thunk=None):
  147.             if thunk:
  148.                 try:
  149.                     self.lock.acquire()
  150.                     self.thunks.append (thunk)
  151.                 finally:
  152.                     self.lock.release()
  153.             self.trigger.send ('x')
  154.  
  155.         def handle_read (self):
  156.             self.recv (8192)
  157.             try:
  158.                 self.lock.acquire()
  159.                 for thunk in self.thunks:
  160.                     try:
  161.                         thunk()
  162.                     except:
  163.                         (file, fun, line), t, v, tbinfo = asyncore.compact_traceback()
  164.                         print 'exception in trigger thunk: (%s:%s %s)' % (t, v, tbinfo)
  165.                 self.thunks = []
  166.             finally:
  167.                 self.lock.release()
  168.  
  169.  
  170. the_trigger = None
  171.  
  172. class trigger_file:
  173.  
  174.     "A 'triggered' file object"
  175.  
  176.     buffer_size = 4096
  177.  
  178.     def __init__ (self, parent):
  179.         global the_trigger
  180.         if the_trigger is None:
  181.             the_trigger = trigger()
  182.         self.parent = parent
  183.         self.buffer = ''
  184.         
  185.     def write (self, data):
  186.         self.buffer = self.buffer + data
  187.         if len(self.buffer) > self.buffer_size:
  188.             d, self.buffer = self.buffer, ''
  189.             the_trigger.pull_trigger (
  190.                 lambda d=d,p=self.parent: p.push (d)
  191.                 )
  192.  
  193.     def writeline (self, line):
  194.         self.write (line+'\r\n')
  195.         
  196.     def writelines (self, lines):
  197.         self.write (
  198.             string.joinfields (
  199.                 lines,
  200.                 '\r\n'
  201.                 ) + '\r\n'
  202.             )
  203.  
  204.     def flush (self):
  205.         if self.buffer:
  206.             d, self.buffer = self.buffer, ''
  207.             the_trigger.pull_trigger (
  208.                 lambda p=self.parent,d=d: p.push (d)
  209.                 )
  210.  
  211.     def softspace (self, *args):
  212.         pass
  213.  
  214.     def close (self):
  215.         # in a derived class, you may want to call trigger_close() instead.
  216.         self.flush()
  217.         self.parent = None
  218.  
  219.     def trigger_close (self):
  220.         d, self.buffer = self.buffer, ''
  221.         p, self.parent = self.parent, None
  222.         the_trigger.pull_trigger (
  223.             lambda p=p,d=d: (p.push(d), p.close_when_done())
  224.             )
  225.  
  226. if __name__ == '__main__':
  227.     
  228.     import time
  229.  
  230.     def thread_function (output_file, i, n):
  231.         print 'entering thread_function'
  232.         while n:
  233.             time.sleep (5)
  234.             output_file.write ('%2d.%2d %s\r\n' % (i, n, output_file))
  235.             output_file.flush()
  236.             n = n - 1
  237.         output_file.close()
  238.         print 'exiting thread_function'
  239.  
  240.     class thread_parent (asynchat.async_chat):
  241.         
  242.         def __init__ (self, conn, addr):
  243.             self.addr = addr
  244.             asynchat.async_chat.__init__ (self, conn)
  245.             self.set_terminator ('\r\n')
  246.             self.buffer = ''
  247.             self.count = 0
  248.  
  249.         def collect_incoming_data (self, data):
  250.             self.buffer = self.buffer + data
  251.  
  252.         def found_terminator (self):
  253.             data, self.buffer = self.buffer, ''
  254.             if not data:
  255.                 asyncore.close_all()
  256.                 print "done"
  257.                 return
  258.             n = string.atoi (string.split (data)[0])
  259.             tf = trigger_file (self)
  260.             self.count = self.count + 1
  261.             thread.start_new_thread (thread_function, (tf, self.count, n))
  262.  
  263.     class thread_server (asyncore.dispatcher):
  264.  
  265.         def __init__ (self, family=socket.AF_INET, address=('', 9003)):
  266.             asyncore.dispatcher.__init__ (self)
  267.             self.create_socket (family, socket.SOCK_STREAM)
  268.             self.set_reuse_addr()
  269.             self.bind (address)
  270.             self.listen (5)
  271.  
  272.         def handle_accept (self):
  273.             conn, addr = self.accept()
  274.             tp = thread_parent (conn, addr)
  275.  
  276.     thread_server()
  277.     #asyncore.loop(1.0, use_poll=1)
  278.     try:
  279.         asyncore.loop ()
  280.     except:
  281.         asyncore.close_all()
  282.